在昨天的文章中,我們探討了無狀態的 ProjectionExec 和 FilterExec 算子。今天我們將學習需要狀態管理的聚合算子(AggregateExec)。
考慮這個查詢:
SELECT department, COUNT(*), SUM(salary), AVG(age)
FROM employees
GROUP BY department;
聚合操作的特殊挑戰:
department
維護獨立的聚合狀態今天的學習目標:
// datafusion/physical-plan/src/aggregates/mod.rs
#[derive(Debug, Clone)]
pub struct AggregateExec {
/// 聚合模式(Partial、Final、Single 等)
mode: AggregateMode,
/// GROUP BY 表達式
group_by: PhysicalGroupBy,
/// 聚合函數表達式(如 SUM、COUNT、AVG)
aggr_expr: Vec<Arc<AggregateFunctionExpr>>,
/// FILTER 子句(條件聚合)
filter_expr: Vec<Option<Arc<dyn PhysicalExpr>>>,
/// 輸入執行計劃
input: Arc<dyn ExecutionPlan>,
/// 輸出 Schema
schema: SchemaRef,
// ... 其他欄位
}
關鍵欄位說明:
1. AggregateMode - 聚合模式
pub enum AggregateMode {
/// 部分聚合:第一階段,可並行執行
Partial,
/// 最終聚合:第二階段,合併部分聚合結果
Final,
/// 單階段聚合:在單一算子中完成全部聚合
Single,
// ... 其他變體
}
2. PhysicalGroupBy - 分組表達式
對於 GROUP BY department
:
expr
包含:[(Column("department"), "department")]
3. aggr_expr - 聚合函數表達式
每個 AggregateFunctionExpr
代表一個聚合函數,如 SUM(salary)
或 COUNT(*)
。
fn execute_typed(&self, partition: usize, context: Arc<TaskContext>) -> Result<StreamType> {
// 情況 1: 沒有 GROUP BY(全局聚合)
if self.group_by.expr.is_empty() {
return Ok(StreamType::AggregateStream(...));
}
// 情況 2: 有 GROUP BY + LIMIT(Top-K 優化)
if let Some(limit) = self.limit {
return Ok(StreamType::GroupedPriorityQueue(...));
}
// 情況 3: 普通的 GROUP BY(Hash 聚合)
Ok(StreamType::GroupedHash(...))
}
考慮分散式場景:數據分佈在 3 個節點,每個節點 1000 萬行。
單階段方案(效率低):
兩階段方案(高效):
第一階段 (Partial):
節點 1: 1000萬行 → 本地聚合 → 1000 個分組結果
節點 2: 1000萬行 → 本地聚合 → 1000 個分組結果
節點 3: 1000萬行 → 本地聚合 → 1000 個分組結果
第二階段 (Final):
協調節點: 合併 3000 個部分結果 → 1000 個最終分組
優勢:
- 網路傳輸量減少到幾 MB(500 倍以上)
- 充分利用各節點的並行計算能力
以 AVG(salary)
為例:
原始數據:
department | salary
-----------+--------
IT | 80000
IT | 90000
Sales | 60000
Partial 聚合後:
department | sum_salary | count
-----------+------------+-------
IT | 170000 | 2
Sales | 60000 | 1
注意:輸出的是 sum 和 count,而非平均值!
為何需要中間狀態?
如果直接輸出平均值:
錯誤:
節點 1: IT 平均 85000 (3 個樣本)
節點 2: IT 平均 88000 (2 個樣本)
合併: (85000 + 88000) / 2 = 86500 ❌ 錯誤!
正確:
節點 1: sum=255000, count=3
節點 2: sum=176000, count=2
合併: (255000 + 176000) / (3 + 2) = 86200 ✓
輸入(來自多個 Partial 階段):
department | sum_salary | count
-----------+------------+-------
IT | 255000 | 3 (節點 1)
IT | 176000 | 2 (節點 2)
Sales | 125000 | 2 (節點 1)
Final 聚合:
IT: sum = 431000, count = 5 → avg = 86200
Sales: sum = 125000, count = 2 → avg = 62500
方面 | Partial 模式 | Final 模式 |
---|---|---|
輸入 | 原始數據行 | 中間狀態 |
處理方法 | update_batch() |
merge_batch() |
輸出 | 中間狀態 | 最終聚合值 |
並行性 | 高(任意分區) | 受限(需按鍵分區) |
DataFusion 使用 Hash Table 管理分組:
pub trait GroupValues {
/// 將新的分組值加入,返回每個值對應的分組索引
fn intern(
&mut self,
cols: &[ArrayRef],
group_indices: &mut Vec<usize>,
) -> Result<()>;
fn len(&self) -> usize;
fn emit(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>>;
}
intern 方法的作用
// 輸入 batch:
// department | salary
// -----------+--------
// IT | 80000
// Sales | 60000
// IT | 90000
group_values.intern(&[department_array], &mut group_indices)?;
// 執行過程:
// 第 0 行 (IT): hash查找 → 沒找到 → 創建 group 0 → indices[0] = 0
// 第 1 行 (Sales): hash查找 → 沒找到 → 創建 group 1 → indices[1] = 1
// 第 2 行 (IT): hash查找 → 找到 group 0 → indices[2] = 0
// 結果: group_indices = [0, 1, 0]
這個 group_indices
會傳給累積器,告訴它每一行應該更新哪個分組。
fn group_aggregate_batch(&mut self, batch: RecordBatch) -> Result<()> {
// 1. 評估 GROUP BY 表達式
let group_by_values = evaluate_group_by(&self.group_by, &batch)?;
// 2. 評估聚合函數的輸入
let input_values = evaluate_many(&self.aggregate_arguments, &batch)?;
// 3. 確定每行對應的分組索引
self.group_values.intern(&group_by_values, &mut self.current_group_indices)?;
// 4. 更新每個累積器
for (accumulator, values) in accumulators.iter_mut().zip(input_values.iter()) {
accumulator.update_batch(&self.current_group_indices, values)?;
}
Ok(())
}
pub trait Accumulator: Send + Sync + Debug {
/// 使用新的輸入值更新累積器狀態
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()>;
/// 返回最終的聚合結果
fn evaluate(&mut self) -> Result<ScalarValue>;
/// 返回累積器的中間狀態(用於 Partial 模式)
fn state(&mut self) -> Result<Vec<ScalarValue>>;
/// 合併其他累積器的狀態(用於 Final 模式)
fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()>;
}
#[derive(Debug)]
pub struct SumAccumulator {
sum: Option<i64>,
}
impl Accumulator for SumAccumulator {
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
let array = as_primitive_array::<Int64Type>(&values[0])?;
if let Some(batch_sum) = arrow::compute::sum(array) {
self.sum = Some(self.sum.unwrap_or(0) + batch_sum);
}
Ok(())
}
fn evaluate(&mut self) -> Result<ScalarValue> {
Ok(ScalarValue::Int64(self.sum))
}
fn state(&mut self) -> Result<Vec<ScalarValue>> {
Ok(vec![ScalarValue::Int64(self.sum)])
}
fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
let array = as_primitive_array::<Int64Type>(&states[0])?;
if let Some(batch_sum) = arrow::compute::sum(array) {
self.sum = Some(self.sum.unwrap_or(0) + batch_sum);
}
Ok(())
}
}
#[derive(Debug)]
pub struct AvgAccumulator {
sum: Option<f64>,
count: u64,
}
impl Accumulator for AvgAccumulator {
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
let array = as_primitive_array::<Float64Type>(&values[0])?;
if let Some(batch_sum) = arrow::compute::sum(array) {
self.sum = Some(self.sum.unwrap_or(0.0) + batch_sum);
}
self.count += (array.len() - array.null_count()) as u64;
Ok(())
}
fn evaluate(&mut self) -> Result<ScalarValue> {
match self.sum {
Some(sum) if self.count > 0 => {
Ok(ScalarValue::Float64(Some(sum / self.count as f64)))
}
_ => Ok(ScalarValue::Float64(None)),
}
}
fn state(&mut self) -> Result<Vec<ScalarValue>> {
// Partial 模式: 返回 sum 和 count
Ok(vec![
ScalarValue::Float64(self.sum),
ScalarValue::UInt64(Some(self.count)),
])
}
fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
// Final 模式: 合併 sum 和 count
let sum_array = as_primitive_array::<Float64Type>(&states[0])?;
let count_array = as_primitive_array::<UInt64Type>(&states[1])?;
if let Some(batch_sum) = arrow::compute::sum(sum_array) {
self.sum = Some(self.sum.unwrap_or(0.0) + batch_sum);
}
if let Some(batch_count) = arrow::compute::sum(count_array) {
self.count += batch_count;
}
Ok(())
}
}
AVG 的完整流程
Partial 階段:
update_batch([80, 90, 85]) → sum=255, count=3
state() → [Float64(255), UInt64(3)]
Final 階段:
merge_batch([255], [3]) → sum=255, count=3
merge_batch([180], [2]) → sum=435, count=5
evaluate() → Float64(87.0)
對於有 GROUP BY
的查詢,為每個分組創建獨立的 Accumulator
效率低。GroupsAccumulator
同時管理多個分組:
pub trait GroupsAccumulator: Send + Sync {
/// 更新多個分組的狀態
fn update_batch(
&mut self,
values: &[ArrayRef],
group_indices: &[usize], // 關鍵:每個值對應的分組索引
opt_filter: Option<&BooleanArray>,
total_num_groups: usize,
) -> Result<()>;
fn evaluate(&mut self, emit_to: EmitTo) -> Result<ArrayRef>;
}
性能優勢
1000 個分組,每組 10000 行:
方案 1: 1000 個獨立 Accumulator
- 1000 次虛擬函數調用
- 處理時間: ~100 ms
方案 2: 單個 GroupsAccumulator
- 1 次調用,內部向量化處理
- 處理時間: ~10 ms
性能提升: 10 倍
今天我們深入探討了 DataFusion 的聚合算子:
AggregateMode
、PhysicalGroupBy
、AggregateFunctionExpr
intern
方法分配分組索引update_batch
、merge_batch
、evaluate
、state
兩階段聚合是分散式數據處理的核心模式,不僅用於 DataFusion,也是 Spark、Flink 等系統的基礎設計。
明天我們將探討聚合算子 Part 2 - Hash vs Sort Aggregate,比較不同聚合策略的適用場景和性能權衡。